package com.wxgzh.service.impl;

import com.wxgzh.service.KafkaService;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.time.LocalDateTime;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.stereotype.Service;

import java.util.Properties;
@Service
public class KafkaServiceImpl implements KafkaService {


    //定义json文件的目录
    private static final String JSON_FILE_DIRECTORY = "D:\\MyCoding\\项目库\\sq\\jsonFile\\";

    //消息写入至kafka
    @Override
    public void kafkaProduceJsonFile(){

        Properties properties = new Properties();
        //	1.配置生产者启动的关键属性参数

        //	1.1	BOOTSTRAP_SERVERS_CONFIG：连接kafka集群的服务列表，如果有多个，使用"逗号"进行分隔
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "123.207.183.95:9092");
        //	1.2	CLIENT_ID_CONFIG：这个属性的目的是标记kafkaclient的ID
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "quickstart-producer");
        //	1.3 KEY_SERIALIZER_CLASS_CONFIG VALUE_SERIALIZER_CLASS_CONFIG
        //	Q: 对 kafka的 key 和 value 做序列化，为什么需要序列化？
        //	A: 因为KAFKA Broker 在接收消息的时候，必须要以二进制的方式接收，所以必须要对KEY和VALUE进行序列化
        //	字符串序列化类：org.apache.kafka.common.serialization.StringSerializer
        //	KEY: 是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //	VALUE: 实际发送消息的内容
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //	2.创建kafka生产者对象 传递properties属性参数集合
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "123.207.183.95:9092");
        String topicName = "topic_" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmm"));
        // 创建 AdminClient 实例
        try (AdminClient adminClient = AdminClient.create(properties)) {
            // 定义要创建的主题名称和分区数
            int numPartitions = 1;
            short replicationFactor = 1;

            // 创建 NewTopic 实例
            NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);

            // 创建主题
            KafkaFuture<Void> future = adminClient.createTopics(Collections.singleton(newTopic)).all();

            // 等待创建操作完成
            future.get();

            System.out.println("Topic " + topicName + " created successfully.");
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 获取当日创建的 JSON 文件
        File jsonFile = findJsonFileCreatedToday(JSON_FILE_DIRECTORY);
        if (jsonFile != null) {
            // 读取 JSON 文件内容
            try (BufferedReader reader = new BufferedReader(new FileReader(jsonFile))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    // 发送消息到 Kafka
                    producer.send(new ProducerRecord<>("topic_name", line));
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("今天没有Json文件");
        }

        // 关闭生产者
        producer.close();

    }
    //消息从kafka中消费
    @Override
    public void kafkaConsumeJsonFile() {
        // Kafka 服务器地址
        String bootstrapServers = "123.207.183.95:9092";

        // Kafka 消费者配置
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "json-file-consumer-group"); // 设置消费者组ID
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的消息开始消费

        // 创建 Kafka 消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

        // 订阅主题
        String topicName = "topic_120240619_1711" ;
        consumer.subscribe(Collections.singletonList(topicName));

        // 消费消息
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumed message: %s\n", record.value());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

    //获取当天创建的json文件
    private static File findJsonFileCreatedToday(String directoryPath) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
        String today = sdf.format(new Date());
        try (Stream<Path> paths = Files.walk(Paths.get(directoryPath))) {
            List<Path> jsonFiles = paths
                    .filter(Files::isRegularFile)
                    .filter(path -> path.toString().endsWith(".json"))
                    .filter(path -> {
                        BasicFileAttributes attrs = null;
                        try {
                            attrs = Files.readAttributes(path, BasicFileAttributes.class);
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                        return sdf.format(attrs.creationTime().toMillis()).equals(today);
                    })
                    .collect(Collectors.toList());
            if (!jsonFiles.isEmpty()) {
                return jsonFiles.get(0).toFile();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }
}
